package com.atguigu.day08

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object $01_WordCount {

  def main(args: Array[String]): Unit = {

    //1、创建StreamingContext对象
    val conf = new SparkConf().setMaster("local[4]").setAppName("test")
    // 500ms
    val ssc = new StreamingContext( conf, Seconds(5) )
    ssc.sparkContext.setLogLevel("error")
    //2、拉取数据
    val ds = ssc.socketTextStream( "hadoop102",9999 )
    //3、数据处理
    val ds2 = ds.flatMap( line=>{
      //模拟数据处理时间超过批次时间
      //Thread.sleep(8000)
      line.split(" ")
    } )

    val ds3 = ds2.map((_,1))

    val ds6 = ds2.transform(rdd=> rdd.map((_,1)) )

    val ds4 = ds3.reduceByKey(_+_)

    //4、结果展示
    ds4.print()
    //5、启动程序
    ssc.start()
    //6、阻塞外部终止
    ssc.awaitTermination()
  }
}
